
package com.shockweb.proxy;


import java.io.UnsupportedEncodingException;
import java.util.UUID;

import com.shockweb.bridge.DataAgreement;
import com.shockweb.bridge.OperationDefine;
import com.shockweb.bridge.ServiceRequest;
import com.shockweb.common.log.LogManager;
import com.shockweb.proxy.config.ProxyConfig;
import com.shockweb.utils.Convert;
import com.shockweb.common.International;
import com.shockweb.common.serializable.SerializableObject;
import com.shockweb.common.context.ContextManager;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 微服务端通信handler
 * 
 * @author 彭明华
 * 2018年1月3日 创建
 */
public class ProxyNettyHandler extends ChannelInboundHandlerAdapter {

	/**
	 * 服务端配置
	 */
	private ProxyConfig config;
	
	/**
	 * 构造方法
	 * @param config
	 */
	public ProxyNettyHandler(ProxyConfig config){
		this.config = config;
	}
	/**
	 * 调用次数
	 */
    private static long called = 0;
    
    /**
     * 返回被调用次数
     * @return
     */
    public static long getCalled(){
    	return called;
    }
    /**
     * 出错次数
     */
    private static long error = 0;
    /**
     * 返回出错次数
     * @return
     */
    public static long getError(){
    	return error;
    }
    /**
     * 正在执行的服务数量
     */
    private static long doing = 0;

    /**
     * 返回正在执行的服务数量
     * @return
     */
    public static long getDoing(){
    	return doing;
    }
    
    /**
     * 超时次数
     */
    private static long timeOut = 0;
    
    /**
     * 返回超时次数
     * @return
     */
    public static long getTimeOut(){
    	return timeOut;
    }
    
    /**
     * ContextManager起始时间关键字
     */
    String key = "START_TIME";
    
    /**
     * 在服务端接收客户端发来的数据时触发， 简而言之就是从通道中读取数据， 但是这个数据在不进行解码时它是ByteBuf类型
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = null;
        //判断数据类型
        if (msg instanceof ByteBuf) {
            buf = (ByteBuf) msg;
        } else {
        	LogManager.errorLog(this.getClass(), new ProxyException("接收到非法数据,非ByteBuf数据"));
            return;
        }
        called++;
        doing++;
        try {
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            byte operation = DataAgreement.resolutionOperation(data);
            if (operation == OperationDefine.ALIVE.value()) {
            	return;
            }
            String uuid = DataAgreement.resolutionUUID(data);
            ContextManager.putValue(key, System.currentTimeMillis());
            int offset = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
            try{
	            if(operation == OperationDefine.REQ_SERVICE.value()) {
	            	ServiceRequest req = (ServiceRequest)Convert.convertToObject(data, offset, data.length-offset, ServiceRequest.class);
	            	byte[] result = ProxyServer.getServer().getClientManager().
	            			getClient(req.getSpaceName(), req.getService()).rpc(uuid,OperationDefine.REQ_SERVICE, SerializableObject.copyOfRange(data, offset, data.length-offset));
	            	sendData(ctx.channel(),result);
	            }else if (operation == OperationDefine.REQ_STOP.value()) {
	            	ProxyServer.stop();
	            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
	            }else{
	            	LogManager.errorLog(this.getClass(), new ProxyException("接收到非法数据,非ByteBuf数据,operation=" + operation));
	            }
            } catch (ProxyException e) {
            	error++;
            	sendException(ctx.channel(),OperationDefine.RES_ERROR,uuid,e);
            	LogManager.errorLog(this.getClass(), e);
            } catch (Throwable e) {
            	error++;
            	sendException(ctx.channel(),OperationDefine.RES_ERROR,uuid,e);
            	LogManager.errorLog(this.getClass(), new ProxyException("接收到非法数据,非ByteBuf数据",e));
            }finally{
            	Object startTime = ContextManager.getValue(key);
            	if(startTime!=null && startTime instanceof Long){
            		if(System.currentTimeMillis() - (Long)startTime>config.getServiceTimeOut()){
            			timeOut++;
            		}
            	}
            }
        } catch (Throwable e) {
        	error++;
        	sendException(ctx.channel(),OperationDefine.RES_ERROR,UUID.randomUUID().toString(),e);
        	LogManager.errorLog(this.getClass(), new ProxyException("接收到非法数据,读取operation或uuid出错",e));
        } finally {
        	ContextManager.remove(key);
        	doing--;
        	if(buf != null){
        		buf.release();//手动释放缓冲区
        	}
        }
    }
    /**
     * 将错误写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendException(Channel channel, OperationDefine operation,String uuid,Throwable e) throws UnsupportedEncodingException{
    	String message = LogManager.exceptionToString(e);
    	if(message!=null){
    		sendData(channel,operation,uuid,message.getBytes(International.CHARSET));
    	}else{
    		sendData(channel,operation,uuid,null);
    	}
    }
    
    /**
     * 将数据写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendData(Channel channel,byte[] data) throws UnsupportedEncodingException{
        if (channel != null && channel.isOpen() && channel.isActive() && channel.isWritable() && data!=null) {
            ByteBufAllocator alloc = channel.alloc();
            ByteBuf buf = alloc.buffer(data.length);
            if(data!=null){
            	buf.writeBytes(data);
            }
        	channel.writeAndFlush(buf).addListener(
            	new ChannelFutureListener() {
                	public void operationComplete(ChannelFuture f)throws Exception {
                        if (!f.isSuccess()) {
                            LogManager.errorLog(ProxyNettyHandler.class, "Message sending failed",f.cause());
                        }
                    }
                });

        }
    }
    
    /**
     * 将数据写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendData(Channel channel, OperationDefine operation,String uuid,byte[] data) throws UnsupportedEncodingException{
        if (channel != null && channel.isOpen() && channel.isActive() && channel.isWritable()) {
            ByteBufAllocator alloc = channel.alloc();
        	int len = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
            if (data != null) {
                len = len + data.length;
            }
            ByteBuf buf = alloc.buffer(len);
            buf.writeByte(operation.value());
            buf.writeBytes(uuid.getBytes(International.CHARSET));
            if(data!=null){
            	buf.writeBytes(data);
            }
        	channel.writeAndFlush(buf).addListener(
            	new ChannelFutureListener() {
                	public void operationComplete(ChannelFuture f)throws Exception {
                        if (!f.isSuccess()) {
                            LogManager.errorLog(ProxyNettyHandler.class, "Message sending failed",f.cause());
                        }
                    }
                });

        }
    }
    
    /**
     * @see ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.disconnect();// 服务端发送异常时关闭客户端
    }

    /**
     * @see ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   		ctx.flush();
    }

    /**
     * 当客户端主动断开服务端的链接后，这个通道就是不活跃的。 也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }
    
    /**
     * 心跳检测
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
                ctx.close();
            }
        }
    }


    
    
}
